{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Python RQ初学者指南\n",
    "## 让 Python 任务排队执行"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "toc": true
   },
   "source": [
    "<h1>目录<span class=\"tocSkip\"></span></h1>\n",
    "<div class=\"toc\"><ul class=\"toc-item\"><li><span><a href=\"#让-Python-任务排队执行\" data-toc-modified-id=\"让-Python-任务排队执行-1\">让 Python 任务排队执行</a></span><ul class=\"toc-item\"><li><span><a href=\"#引言\" data-toc-modified-id=\"引言-1.1\">引言</a></span></li><li><span><a href=\"#为什么我的-Python程序需要队列？\" data-toc-modified-id=\"为什么我的-Python程序需要队列？-1.2\">为什么我的 Python程序需要队列？</a></span><ul class=\"toc-item\"><li><span><a href=\"#第一步，你需要有个-redis。\" data-toc-modified-id=\"第一步，你需要有个-redis。-1.2.1\">第一步，你需要有个 redis。</a></span></li><li><span><a href=\"#第二步-安装-RQ\" data-toc-modified-id=\"第二步-安装-RQ-1.2.2\">第二步 安装 RQ</a></span></li><li><span><a href=\"#第三步-建立Redis-Queue\" data-toc-modified-id=\"第三步-建立Redis-Queue-1.2.3\">第三步 建立Redis Queue</a></span></li><li><span><a href=\"#第四步-准备好你的任务\" data-toc-modified-id=\"第四步-准备好你的任务-1.2.4\">第四步 准备好你的任务</a></span></li><li><span><a href=\"#第五步-把任务加入RQ队列\" data-toc-modified-id=\"第五步-把任务加入RQ队列-1.2.5\">第五步 把任务加入RQ队列</a></span></li><li><span><a href=\"#第六步-起个工人开始干活吧\" data-toc-modified-id=\"第六步-起个工人开始干活吧-1.2.6\">第六步 起个工人开始干活吧</a></span></li><li><span><a href=\"#第七步-如果-Job-间有依赖关系\" data-toc-modified-id=\"第七步-如果-Job-间有依赖关系-1.2.7\">第七步 如果 Job 间有依赖关系</a></span></li><li><span><a href=\"#出错处理\" data-toc-modified-id=\"出错处理-1.2.8\">出错处理</a></span></li></ul></li></ul></li></ul></div>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "### 引言\n",
    "\n",
    "已经有一些关于 Python RQ 的教程了，比如[简易教程-RQ](https://www.twle.cn/go/rq)，\n",
    "\n",
    "我写这篇 Python RQ 指南时站在工程师的角度，让你通过动手一步步走下来，看完这一篇文章基本搞懂 RQ怎么用。\n",
    "\n",
    "我在这篇文章中也会带入我用 Python 处理数据时的一些思考。\n",
    "\n",
    "我这个notebook可以从github下载。\n",
    "\n",
    "https://github.com/hugulas/cqyblog/blob/master/python/Python%20RQ%E5%88%9D%E5%AD%A6%E8%80%85%E6%8C%87%E5%8D%97.ipynb\n",
    "\n",
    "### 为什么我的 Python程序需要队列？\n",
    "\n",
    "我学习使用 Python 已经有一段时间了，主要是用做数据处理，需要把数据从机器上下载下来，经过处理后放到结构化的数据库中。我自己设计的爬虫需要用到这个 Python 程序，它每天定时执行，把需要处理的数据下载产生报表。爬虫要按照顺序执行好几个步骤，比如下载数据，解析数据，存储到数据库，然后产生摘要。我的同事还设计了 web 应用，允许用户手动调用这些步骤。\n",
    "\n",
    "但是，让人困扰的是怎么监控每个任务的运行状态，通过日志和任务进程，我个人感觉都不是规范的做法。当出现错误时候，怎么处理也是问题啊。\n",
    "\n",
    "#### 第一步，你需要有个 redis。\n",
    "\n",
    "如果你是在Linux x86或者 mac 上，这条 docker命令能让你一步拥有redis 环境。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "docker run --name my-redis-container -p 6379:6379 -d redis"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "如果你的环境是LinuxOne/s390x，那就用下面这条命令。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "docker run --name my-redis-container -p 6379:6379 -d s390x/redis"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第二步 安装 RQ"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: rq in /usr/local/lib/python2.7/site-packages (1.3.0)\n",
      "Requirement already satisfied: click>=5.0 in /usr/local/lib/python2.7/site-packages (from rq) (7.1.1)\n",
      "Requirement already satisfied: redis>=3.0.0 in /usr/local/lib/python2.7/site-packages (from rq) (3.4.1)\n",
      "Requirement already satisfied: lxml in /usr/local/lib/python2.7/site-packages (4.2.5)\n",
      "Requirement already satisfied: requests in /usr/local/lib/python2.7/site-packages (2.21.0)\n",
      "Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python2.7/site-packages (from requests) (2017.4.17)\n",
      "Requirement already satisfied: chardet<3.1.0,>=3.0.2 in /usr/local/lib/python2.7/site-packages (from requests) (3.0.4)\n",
      "Requirement already satisfied: idna<2.9,>=2.5 in /usr/local/lib/python2.7/site-packages (from requests) (2.6)\n",
      "Requirement already satisfied: urllib3<1.25,>=1.21.1 in /usr/local/lib/python2.7/site-packages (from requests) (1.24.1)\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "You are using pip version 18.1, however version 20.0.2 is available.\n",
      "You should consider upgrading via the 'pip install --upgrade pip' command.\n",
      "You are using pip version 18.1, however version 20.0.2 is available.\n",
      "You should consider upgrading via the 'pip install --upgrade pip' command.\n",
      "You are using pip version 18.1, however version 20.0.2 is available.\n",
      "You should consider upgrading via the 'pip install --upgrade pip' command.\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "pip install --user rq\n",
    "# 为了执行后面的测试程序，我们还需要安装 lxml和 requests,但是这不是RQ需要的\n",
    "pip install --user lxml\n",
    "pip install --user requests"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第三步 建立Redis Queue\n",
    "\n",
    "默认的 RQ 是连接到 redis://@localhost:6379/0。如果你的配置不是这样，自己去查参数吧。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from redis import Redis\n",
    "from rq import Queue\n",
    "\n",
    "q = Queue(connection=Redis())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第四步 准备好你的任务\n",
    "\n",
    "你需要随便写个函数，并把他放在某个独立的 python 文件中。注：RQ不允许使用当前“\\_\\_main\\_\\_”所在的文件中的函数。所以，不要偷懒，单独准备个文件吧。'\n",
    "\n",
    "下面这个例子时官方文档提供的，会点网页的词数, 代码如下:\n",
    "\n",
    "代码在 [my_module.py](my_module.py)\n",
    "\n",
    "```python\n",
    "import requests\n",
    "\n",
    "def count_words_at_url(url):\n",
    "    resp = requests.get(url)\n",
    "    return len(resp.text.split())\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第五步 把任务加入RQ队列"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from my_module import count_words_at_url\n",
    "result = q.enqueue(\n",
    "             count_words_at_url, 'http://nvie.com')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "RQ不仅仅提供了 python 库，还提供了rq命令。通过\"rq info\"子命令，我们可以看到任务已经在队列里面了。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "default      |█ 1\n",
      "1 queues, 1 jobs total\n",
      "\n",
      "31b8a4a03a524c1a9a106d00419f8532 (miaocx-mbp.local 27711): busy default\n",
      "1 workers, 1 queues\n",
      "\n",
      "Updated: 2020-03-26 14:33:13.925443\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "rq info"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第六步 起个工人开始干活吧\n",
    "\n",
    "我们可以通过 rq worker子命令启动一个worker来处理任务，但是要注意的是\n",
    "1. 如果你的机器上有多个Python环境，请把Path指向你要用的python可执行程序所在的目录。\n",
    "2. 把 PYTHONPATH指向mymodule.py所在目录\n",
    "\n",
    "但是因为worker进程是在后台一直运行的，所以需要你在自己的终端里面启动。\n",
    "\n",
    "```bash\n",
    "$ export PATH=~/my_env/venv/bin:$PATH\n",
    "$ export PYTHONPATH=~/my_folder/cqyblog/python/:$PYTHONPATH\n",
    "$ rq worker\n",
    "```\n",
    "\n",
    "\n",
    "\n",
    "你再次打开 rq info，就可以看到任务是不是执行完了。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "default      |█ 1\n",
      "1 queues, 1 jobs total\n",
      "\n",
      "31b8a4a03a524c1a9a106d00419f8532 (miaocx-mbp.local 27711): busy default\n",
      "1 workers, 1 queues\n",
      "\n",
      "Updated: 2020-03-26 14:33:29.341319\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "rq info"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 第七步 如果 Job 间有依赖关系\n",
    "\n",
    "我们的任务可能相互之间有先后关系或者说依赖关系。\n",
    "\n",
    "比如说，我们要先下载页面，然后找出上面全部连接，分成两步走。\n",
    "\n",
    "\"download_page\"方法，把页面下载到本地content.txt. count_links负责分析link, 通过depends_on=job_id指定依赖关系。\n",
    "只有\"download_page\"成功了，才会执行 下一步。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from my_module import download_page, count_links\n",
    "job1 = q.enqueue(\n",
    "             download_page, 'http://nvie.com')\n",
    "job2 = q.enqueue(count_links, depends_on=job1.id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "如果我们的worker还活着，它就会依次执行job1, job2。这里我故意把程序写错了，job1的输出时\"context.txt\", job2却没有去读“context.txt\", 而是读取\"page.txt\"。rq worker 里面就会看到下面的错误。\n",
    "\n",
    "```\n",
    "FileNotFoundError: [Errno 2] No such file or directory: 'page.txt'\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 出错处理\n",
    "rq info给出的信息还是比较有限的。如果有些任务失败了，我们怎么去处理呢？\n",
    "\n",
    "我们可以打印出出错的job id和出错信息等信息。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1172495b-fe0b-4813-8a3a-fb0a6bd14052\n",
      "Traceback (most recent call last):\n",
      "  File \"/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/worker.py\", line 886, in perform_job\n",
      "    rv = job.perform()\n",
      "  File \"/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/job.py\", line 664, in perform\n",
      "    self._result = self._execute()\n",
      "  File \"/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/job.py\", line 670, in _execute\n",
      "    return self.func(*self.args, **self.kwargs)\n",
      "  File \"/Users/cqy/Box Sync/cqyblog/python/my_module.py\", line 17, in count_links\n",
      "    with open(\"page.txt\",'r') as content_file:\n",
      "FileNotFoundError: [Errno 2] No such file or directory: 'page.txt'\n",
      "\n"
     ]
    }
   ],
   "source": [
    "for job_id in q.failed_job_registry.get_job_ids():\n",
    "    job=q.fetch_job(job_id)\n",
    "    print(job_id)\n",
    "    print(job.exc_info)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们现在可以去把 content.txt 拷贝到 page.txt，手动解决这个问题。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "cp content.txt page.txt"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后我们可以把这个job重新加入队列。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "job = q.failed_job_registry.requeue(job_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在 worker 那里，我们就能看到它顺利执行完了。\n",
    "\n",
    "```\n",
    "14:43:35 default: my_module.count_links() (1172495b-fe0b-4813-8a3a-fb0a6bd14052)\n",
    "['/', '/posts/', '/about/', '/posts/git-power-tools/', '/posts/introducing-decoders/', '/posts/why-you-should-consider-technical-debt-to-be-real-debt/', '/posts/beautiful-code/', '/posts/a-successful-git-branching-model/', '/posts/']\n",
    "14:43:35 default: Job OK (1172495b-fe0b-4813-8a3a-fb0a6bd14052)\n",
    "14:43:35 Result is kept for 500 seconds\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "注：如果需要删除失败的job,可以执行下面这段。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "q.failed_job_registry.remove(\"1440f09a-2921-439b-bde9-27abfd598041\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": true,
   "skip_h1_title": true,
   "title_cell": "目录",
   "title_sidebar": "Contents",
   "toc_cell": true,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
